﻿#if NET40
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// ProducerConsumerQueues.cs
//
//
// Specialized producer/consumer queues.
//
//
// ************<IMPORTANT NOTE>*************
//
// There are two exact copies of this file:
//  src\ndp\clr\src\bcl\system\threading\tasks\producerConsumerQueue.cs
//  src\ndp\fx\src\dataflow\system\threading\tasks\dataflow\internal\producerConsumerQueue.cs
// Keep both of them consistent by changing the other file when you change this one, also avoid:
//  1- To reference internal types in mscorlib
//  2- To reference any dataflow specific types
// This should be fixed post Dev11 when this class becomes public.
//
// ************</IMPORTANT NOTE>*************
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using System.Linq;
using System.Runtime.InteropServices;

namespace System.Threading.Tasks
{
  #region == interface IProducerConsumerQueue<T> ==

  /// <summary>Represents a producer/consumer queue used internally by dataflow blocks.</summary>
  /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
  internal interface IProducerConsumerQueue<T> : IEnumerable<T>
  {
    /// <summary>Enqueues an item into the queue.</summary>
    /// <param name="item">The item to enqueue.</param>
    /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
    void Enqueue(T item);

    /// <summary>Attempts to dequeue an item from the queue.</summary>
    /// <param name="result">The dequeued item.</param>
    /// <returns>true if an item could be dequeued; otherwise, false.</returns>
    /// <remarks>This method is meant to be thread-safe subject to the particular nature of the implementation.</remarks>
    Boolean TryDequeue(out T result);

    /// <summary>Gets whether the collection is currently empty.</summary>
    /// <remarks>This method may or may not be thread-safe.</remarks>
    Boolean IsEmpty { get; }

    /// <summary>Gets the number of items in the collection.</summary>
    /// <remarks>In many implementations, this method will not be thread-safe.</remarks>
    Int32 Count { get; }

    /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
    /// <param name="syncObj">The sync object used to lock</param>
    /// <returns>The collection count</returns>
    Int32 GetCountSafe(object syncObj);
  }

  #endregion

  #region == class MultiProducerMultiConsumerQueue<T> ==

  /// <summary>Provides a producer/consumer queue safe to be used by any number of producers and consumers concurrently.</summary>
  /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
  [DebuggerDisplay("Count = {Count}")]
  internal sealed class MultiProducerMultiConsumerQueue<T> : ConcurrentQueue<T>, IProducerConsumerQueue<T>
  {
    /// <summary>Enqueues an item into the queue.</summary>
    /// <param name="item">The item to enqueue.</param>
    void IProducerConsumerQueue<T>.Enqueue(T item) { base.Enqueue(item); }

    /// <summary>Attempts to dequeue an item from the queue.</summary>
    /// <param name="result">The dequeued item.</param>
    /// <returns>true if an item could be dequeued; otherwise, false.</returns>
    Boolean IProducerConsumerQueue<T>.TryDequeue(out T result) { return base.TryDequeue(out result); }

    /// <summary>Gets whether the collection is currently empty.</summary>
    Boolean IProducerConsumerQueue<T>.IsEmpty { get { return base.IsEmpty; } }

    /// <summary>Gets the number of items in the collection.</summary>
    Int32 IProducerConsumerQueue<T>.Count { get { return base.Count; } }

    /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
    /// <remarks>ConcurrentQueue.Count is thread safe, no need to acquire the lock.</remarks>
    Int32 IProducerConsumerQueue<T>.GetCountSafe(Object syncObj) { return base.Count; }
  }

  #endregion

  #region == class SingleProducerSingleConsumerQueue<T> ==

  /// <summary>Provides a producer/consumer queue safe to be used by only one producer and one consumer concurrently.</summary>
  /// <typeparam name="T">Specifies the type of data contained in the queue.</typeparam>
  [DebuggerDisplay("Count = {Count}")]
  [DebuggerTypeProxy(typeof(SingleProducerSingleConsumerQueue<>.SingleProducerSingleConsumerQueue_DebugView))]
  internal sealed class SingleProducerSingleConsumerQueue<T> : IProducerConsumerQueue<T>
  {
    #region - Design -

    // SingleProducerSingleConsumerQueue (SPSCQueue) is a concurrent queue designed to be used
    // by one producer thread and one consumer thread. SPSCQueue does not work correctly when used by
    // multiple producer threads concurrently or multiple consumer threads concurrently.
    //
    // SPSCQueue is based on segments that behave like circular buffers. Each circular buffer is represented
    // as an array with two indexes: _first and _last. _first is the index of the array slot for the consumer
    // to read next, and _last is the slot for the producer to write next. The circular buffer is empty when
    // (_first == _last), and full when ((_last+1) % _array.Length == _first).
    //
    // Since _first is only ever modified by the consumer thread and _last by the producer, the two indices can
    // be updated without interlocked operations. As long as the queue size fits inside a single circular buffer,
    // enqueues and dequeues simply advance the corresponding indices around the circular buffer. If an enqueue finds
    // that there is no room in the existing buffer, however, a new circular buffer is allocated that is twice as big
    // as the old buffer. From then on, the producer will insert values into the new buffer. The consumer will first
    // empty out the old buffer and only then follow the producer into the new (larger) buffer.
    //
    // As described above, the enqueue operation on the fast path only modifies the _first field of the current segment.
    // However, it also needs to read _last in order to verify that there is room in the current segment. Similarly, the
    // dequeue operation on the fast path only needs to modify _last, but also needs to read _first to verify that the
    // queue is non-empty. This results in true cache line sharing between the producer and the consumer.
    //
    // The cache line sharing issue can be mitigating by having a possibly stale copy of _first that is owned by the producer,
    // and a possibly stale copy of _last that is owned by the consumer. So, the consumer state is described using
    // (_first, _lastCopy) and the producer state using (_firstCopy, _last). The consumer state is separated from
    // the producer state by padding, which allows fast-path enqueues and dequeues from hitting shared cache lines.
    // _lastCopy is the consumer's copy of _last. Whenever the consumer can tell that there is room in the buffer
    // simply by observing _lastCopy, the consumer thread does not need to read _last and thus encounter a cache miss. Only
    // when the buffer appears to be empty will the consumer refresh _lastCopy from _last. _firstCopy is used by the producer
    // in the same way to avoid reading _first on the hot path.

    #endregion

    #region @ Fields @

    /// <summary>The initial size to use for segments (in number of elements).</summary>
    private const Int32 INIT_SEGMENT_SIZE = 32; // must be a power of 2

    /// <summary>The maximum size to use for segments (in number of elements).</summary>
    private const Int32 MAX_SEGMENT_SIZE = 0x1000000; // this could be made as large as Int32.MaxValue / 2

    /// <summary>The head of the linked list of segments.</summary>
    private volatile Segment _head;

    /// <summary>The tail of the linked list of segments.</summary>
    private volatile Segment _tail;

    #endregion

    #region @ Constructors @

    /// <summary>Initializes the queue.</summary>
    internal SingleProducerSingleConsumerQueue()
    {
      // Validate constants in ctor rather than in an explicit cctor that would cause perf degradation
      Debug.Assert(INIT_SEGMENT_SIZE > 0, "Initial segment size must be > 0.");
      Debug.Assert((INIT_SEGMENT_SIZE & (INIT_SEGMENT_SIZE - 1)) == 0, "Initial segment size must be a power of 2");
      Debug.Assert(INIT_SEGMENT_SIZE <= MAX_SEGMENT_SIZE, "Initial segment size should be <= maximum.");
      Debug.Assert(MAX_SEGMENT_SIZE < Int32.MaxValue / 2, "Max segment size * 2 must be < Int32.MaxValue, or else overflow could occur.");

      // Initialize the queue
      _head = _tail = new Segment(INIT_SEGMENT_SIZE);
    }

    #endregion

    #region - Enqueue -

    /// <summary>Enqueues an item into the queue.</summary>
    /// <param name="item">The item to enqueue.</param>
    public void Enqueue(T item)
    {
      var segment = _tail;
      var array = segment._array;
      var last = segment._state._last; // local copy to avoid multiple volatile reads

      // Fast path: there's obviously room in the current segment
      var tail2 = (last + 1) & (array.Length - 1);
      if (tail2 != segment._state._firstCopy)
      {
        array[last] = item;
        segment._state._last = tail2;
      }
      // Slow path: there may not be room in the current segment.
      else { EnqueueSlow(item, ref segment); }
    }

    #endregion

    #region * EnqueueSlow *

    /// <summary>Enqueues an item into the queue.</summary>
    /// <param name="item">The item to enqueue.</param>
    /// <param name="segment">The segment in which to first attempt to store the item.</param>
    private void EnqueueSlow(T item, ref Segment segment)
    {
      Debug.Assert(segment != null, "Expected a non-null segment.");

      if (segment._state._firstCopy != segment._state._first)
      {
        segment._state._firstCopy = segment._state._first;
        Enqueue(item); // will only recur once for this enqueue operation
        return;
      }

      var newSegmentSize = _tail._array.Length << 1; // double size
      Debug.Assert(newSegmentSize > 0, "The max size should always be small enough that we don't overflow.");
      if (newSegmentSize > MAX_SEGMENT_SIZE) { newSegmentSize = MAX_SEGMENT_SIZE; }

      var newSegment = new Segment(newSegmentSize);
      newSegment._array[0] = item;
      newSegment._state._last = 1;
      newSegment._state._lastCopy = 1;

      try { }
      finally
      {
        // Finally block to protect against corruption due to a thread abort
        // between setting _next and setting _tail.
        Volatile.Write(ref _tail._next, newSegment); // ensure segment not published until item is fully stored
        _tail = newSegment;
      }
    }

    #endregion

    #region - TryDequeue -

    /// <summary>Attempts to dequeue an item from the queue.</summary>
    /// <param name="result">The dequeued item.</param>
    /// <returns>true if an item could be dequeued; otherwise, false.</returns>
    public Boolean TryDequeue(out T result)
    {
      var segment = _head;
      var array = segment._array;
      var first = segment._state._first; // local copy to avoid multiple volatile reads

      // Fast path: there's obviously data available in the current segment
      if (first != segment._state._lastCopy)
      {
        result = array[first];
        array[first] = default(T); // Clear the slot to release the element
        segment._state._first = (first + 1) & (array.Length - 1);
        return true;
      }
      // Slow path: there may not be data available in the current segment
      else
      {
        return TryDequeueSlow(ref segment, ref array, out result);
      }
    }

    #endregion

    #region * TryDequeueSlow *

    /// <summary>Attempts to dequeue an item from the queue.</summary>
    /// <param name="array">The array from which the item was dequeued.</param>
    /// <param name="segment">The segment from which the item was dequeued.</param>
    /// <param name="result">The dequeued item.</param>
    /// <returns>true if an item could be dequeued; otherwise, false.</returns>
    private Boolean TryDequeueSlow(ref Segment segment, ref T[] array, out T result)
    {
      Debug.Assert(segment != null, "Expected a non-null segment.");
      Debug.Assert(array != null, "Expected a non-null item array.");

      if (segment._state._last != segment._state._lastCopy)
      {
        segment._state._lastCopy = segment._state._last;
        return TryDequeue(out result); // will only recur once for this dequeue operation
      }

      if (segment._next != null && segment._state._first == segment._state._last)
      {
        segment = segment._next;
        array = segment._array;
        _head = segment;
      }

      Int32 first = segment._state._first; // local copy to avoid extraneous volatile reads

      if (first == segment._state._last)
      {
        result = default(T);
        return false;
      }

      result = array[first];
      array[first] = default(T); // Clear the slot to release the element
      segment._state._first = (first + 1) & (segment._array.Length - 1);
      segment._state._lastCopy = segment._state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy

      return true;
    }

    #endregion

    #region - TryPeek -

    /// <summary>Attempts to peek at an item in the queue.</summary>
    /// <param name="result">The peeked item.</param>
    /// <returns>true if an item could be peeked; otherwise, false.</returns>
    public Boolean TryPeek(out T result)
    {
      var segment = _head;
      var array = segment._array;
      var first = segment._state._first; // local copy to avoid multiple volatile reads

      // Fast path: there's obviously data available in the current segment
      if (first != segment._state._lastCopy)
      {
        result = array[first];
        return true;
      }
      // Slow path: there may not be data available in the current segment
      else
      {
        return TryPeekSlow(ref segment, ref array, out result);
      }
    }

    #endregion

    #region * TryPeekSlow *

    /// <summary>Attempts to peek at an item in the queue.</summary>
    /// <param name="array">The array from which the item is peeked.</param>
    /// <param name="segment">The segment from which the item is peeked.</param>
    /// <param name="result">The peeked item.</param>
    /// <returns>true if an item could be peeked; otherwise, false.</returns>
    private Boolean TryPeekSlow(ref Segment segment, ref T[] array, out T result)
    {
      Debug.Assert(segment != null, "Expected a non-null segment.");
      Debug.Assert(array != null, "Expected a non-null item array.");

      if (segment._state._last != segment._state._lastCopy)
      {
        segment._state._lastCopy = segment._state._last;
        return TryPeek(out result); // will only recur once for this peek operation
      }

      if (segment._next != null && segment._state._first == segment._state._last)
      {
        segment = segment._next;
        array = segment._array;
        _head = segment;
      }

      Int32 first = segment._state._first; // local copy to avoid extraneous volatile reads

      if (first == segment._state._last)
      {
        result = default(T);
        return false;
      }

      result = array[first];
      return true;
    }

    #endregion

    #region - TryDequeueIf -

    /// <summary>Attempts to dequeue an item from the queue.</summary>
    /// <param name="predicate">The predicate that must return true for the item to be dequeued.  If null, all items implicitly return true.</param>
    /// <param name="result">The dequeued item.</param>
    /// <returns>true if an item could be dequeued; otherwise, false.</returns>
    public Boolean TryDequeueIf(Predicate<T> predicate, out T result)
    {
      var segment = _head;
      var array = segment._array;
      var first = segment._state._first; // local copy to avoid multiple volatile reads

      // Fast path: there's obviously data available in the current segment
      if (first != segment._state._lastCopy)
      {
        result = array[first];
        if (predicate == null || predicate(result))
        {
          array[first] = default(T); // Clear the slot to release the element
          segment._state._first = (first + 1) & (array.Length - 1);
          return true;
        }
        else
        {
          result = default(T);
          return false;
        }
      }
      // Slow path: there may not be data available in the current segment
      else
      {
        return TryDequeueIfSlow(predicate, ref segment, ref array, out result);
      }
    }

    #endregion

    #region * TryDequeueIfSlow *

    /// <summary>Attempts to dequeue an item from the queue.</summary>
    /// <param name="predicate">The predicate that must return true for the item to be dequeued.  If null, all items implicitly return true.</param>
    /// <param name="array">The array from which the item was dequeued.</param>
    /// <param name="segment">The segment from which the item was dequeued.</param>
    /// <param name="result">The dequeued item.</param>
    /// <returns>true if an item could be dequeued; otherwise, false.</returns>
    private Boolean TryDequeueIfSlow(Predicate<T> predicate, ref Segment segment, ref T[] array, out T result)
    {
      Debug.Assert(segment != null, "Expected a non-null segment.");
      Debug.Assert(array != null, "Expected a non-null item array.");

      if (segment._state._last != segment._state._lastCopy)
      {
        segment._state._lastCopy = segment._state._last;
        return TryDequeueIf(predicate, out result); // will only recur once for this dequeue operation
      }

      if (segment._next != null && segment._state._first == segment._state._last)
      {
        segment = segment._next;
        array = segment._array;
        _head = segment;
      }

      var first = segment._state._first; // local copy to avoid extraneous volatile reads

      if (first == segment._state._last)
      {
        result = default(T);
        return false;
      }

      result = array[first];
      if (predicate == null || predicate(result))
      {
        array[first] = default(T); // Clear the slot to release the element
        segment._state._first = (first + 1) & (segment._array.Length - 1);
        segment._state._lastCopy = segment._state._last; // Refresh _lastCopy to ensure that _first has not passed _lastCopy
        return true;
      }
      else
      {
        result = default(T);
        return false;
      }
    }

    #endregion

    #region - Clear -

    public void Clear()
    {
      T ignored;
      while (TryDequeue(out ignored)) ;
    }

    #endregion

    #region - IEnumerator Members -

    /// <summary>Gets an enumerable for the collection.</summary>
    /// <remarks>WARNING: This should only be used for debugging purposes.  It is not safe to be used concurrently.</remarks>
    public IEnumerator<T> GetEnumerator()
    {
      for (Segment segment = _head; segment != null; segment = segment._next)
      {
        for (Int32 pt = segment._state._first;
            pt != segment._state._last;
            pt = (pt + 1) & (segment._array.Length - 1))
        {
          yield return segment._array[pt];
        }
      }
    }

    /// <summary>Gets an enumerable for the collection.</summary>
    /// <remarks>WARNING: This should only be used for debugging purposes.  It is not safe to be used concurrently.</remarks>
    IEnumerator IEnumerable.GetEnumerator()
    {
      return GetEnumerator();
    }

    #endregion

    #region - IProducerConsumerQueue Members -

    /// <summary>Gets whether the collection is currently empty.</summary>
    /// <remarks>WARNING: This should not be used concurrently without further vetting.</remarks>
    public Boolean IsEmpty
    {
      // This implementation is optimized for calls from the consumer.
      get
      {
        Segment head = _head;
        if (head._state._first != head._state._lastCopy) { return false; } // _first is volatile, so the read of _lastCopy cannot get reordered
        if (head._state._first != head._state._last) { return false; }
        return head._next == null;
      }
    }

    /// <summary>Gets the number of items in the collection.</summary>
    /// <remarks>WARNING: This should only be used for debugging purposes.  It is not meant to be used concurrently.</remarks>
    public Int32 Count
    {
      get
      {
        var count = 0;
        for (Segment segment = _head; segment != null; segment = segment._next)
        {
          var arraySize = segment._array.Length;
          Int32 first, last;
          while (true) // Count is not meant to be used concurrently, but this helps to avoid issues if it is
          {
            first = segment._state._first;
            last = segment._state._last;
            if (first == segment._state._first) { break; }
          }
          count += (last - first) & (arraySize - 1);
        }
        return count;
      }
    }

    /// <summary>A thread-safe way to get the number of items in the collection. May synchronize access by locking the provided synchronization object.</summary>
    /// <remarks>The Count is not thread safe, so we need to acquire the lock.</remarks>
    Int32 IProducerConsumerQueue<T>.GetCountSafe(Object syncObj)
    {
      Debug.Assert(syncObj != null, "The syncObj parameter is null.");
      lock (syncObj)
      {
        return Count;
      }
    }

    #endregion

    #region * class Segment *

    /// <summary>A segment in the queue containing one or more items.</summary>
    [StructLayout(LayoutKind.Sequential)]
    private sealed class Segment
    {
      /// <summary>The next segment in the linked list of segments.</summary>
      internal Segment _next;

      /// <summary>The data stored in this segment.</summary>
      internal readonly T[] _array;

      /// <summary>Details about the segment.</summary>
      internal SegmentState _state; // separated out to enable StructLayout attribute to take effect

      /// <summary>Initializes the segment.</summary>
      /// <param name="size">The size to use for this segment.</param>
      internal Segment(Int32 size)
      {
        Debug.Assert((size & (size - 1)) == 0, "Size must be a power of 2");
        _array = new T[size];
      }
    }

    #endregion

    #region * struct SegmentState *

    /// <summary>Stores information about a segment.</summary>
    [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing
    private struct SegmentState
    {
      /// <summary>Padding to reduce false sharing between the segment's array and _first.</summary>
      internal PaddingFor32 _pad0;

      /// <summary>The index of the current head in the segment.</summary>
      internal volatile Int32 _first;

      /// <summary>A copy of the current tail index.</summary>
      internal Int32 _lastCopy; // not volatile as read and written by the producer, except for IsEmpty, and there _lastCopy is only read after reading the volatile _first

      /// <summary>Padding to reduce false sharing between the first and last.</summary>
      internal PaddingFor32 _pad1;

      /// <summary>A copy of the current head index.</summary>
      internal Int32 _firstCopy; // not volatile as only read and written by the consumer thread

      /// <summary>The index of the current tail in the segment.</summary>
      internal volatile Int32 _last;

      /// <summary>Padding to reduce false sharing with the last and what's after the segment.</summary>
      internal PaddingFor32 _pad2;
    }

    #endregion

    #region * class SingleProducerSingleConsumerQueue_DebugView *

    /// <summary>Debugger type proxy for a SingleProducerSingleConsumerQueue of T.</summary>
    private sealed class SingleProducerSingleConsumerQueue_DebugView
    {
      /// <summary>The queue being visualized.</summary>
      private readonly SingleProducerSingleConsumerQueue<T> _queue;

      /// <summary>Initializes the debug view.</summary>
      /// <param name="queue">The queue being debugged.</param>
      public SingleProducerSingleConsumerQueue_DebugView(SingleProducerSingleConsumerQueue<T> queue)
      {
        Debug.Assert(queue != null, "Expected a non-null queue.");
        _queue = queue;
      }

      /// <summary>Gets the contents of the list.</summary>
      [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)]
      public T[] Items
      {
        get
        {
          var list = new List<T>();
          foreach (T item in _queue) { list.Add(item); }
          return list.ToArray();
        }
      }
    }

    #endregion
  }

  #endregion

  #region ==& class PaddingHelpers &==

  /// <summary>A placeholder class for common padding constants and eventually routines.</summary>
  internal static class PaddingHelpers
  {
    /// <summary>A size greater than or equal to the size of the most common CPU cache lines.</summary>
    internal const Int32 CACHE_LINE_SIZE = 128;
  }

  #endregion

  #region == struct PaddingFor32 ==

  /// <summary>Padding structure used to minimize false sharing in SingleProducerSingleConsumerQueue{T}.</summary>
  [StructLayout(LayoutKind.Explicit, Size = PaddingHelpers.CACHE_LINE_SIZE - sizeof(Int32))]
  internal struct PaddingFor32 { }

  #endregion
}
#endif